Golang 中使用 ES
使用 Golang 操作 ES,需要使用官方提供的 go-elasticsearch
基本的 CRUD 操作
package main
import (
"context"
"encoding/json"
"log"
"strings"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
)
func main() {
// 设置Elasticsearch连接参数
cfg := elasticsearch.Config{
Addresses: []string{
"http://es17-dev.maizuo.com:9200",
},
}
// 创建Elasticsearch客户端
client, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("无法创建Elasticsearch客户端:%v", err)
}
// 创建一个新的索引
indexName := "sample_index"
createIndex(client, indexName)
// 插入文档
docID := "1"
document := map[string]interface{}{
"title": "示例文档",
"content": "这是一个示例文档的内容。",
}
insertDocument(client, indexName, docID, document)
// 获取文档
retrievedDoc, err := getDocument(client, indexName, docID)
if err != nil {
log.Printf("无法获取文档:%v", err)
} else {
log.Printf("检索到的文档:%v", retrievedDoc)
}
// 删除索引
deleteIndex(client, indexName)
}
func createIndex(client *elasticsearch.Client, indexName string) {
ctx := context.Background()
createIndexRequest := esapi.IndicesCreateRequest{
Index: indexName,
}
res, err := createIndexRequest.Do(ctx, client)
if err != nil {
log.Fatalf("无法创建索引:%v", err)
}
defer res.Body.Close()
if res.IsError() {
log.Fatalf("创建索引失败:%s", res.String())
}
log.Printf("索引 %s 创建成功", indexName)
}
func insertDocument(client *elasticsearch.Client, indexName, docID string, document map[string]interface{}) {
ctx := context.Background()
jsonData, err := json.Marshal(document)
if err != nil {
log.Fatalf("无法序列化文档:%v", err)
}
insertRequest := esapi.IndexRequest{
Index: indexName,
DocumentID: docID,
Body: strings.NewReader(string(jsonData)),
Refresh: "true",
}
res, err := insertRequest.Do(ctx, client)
if err != nil {
log.Fatalf("无法插入文档:%v", err)
}
defer res.Body.Close()
if res.IsError() {
log.Fatalf("插入文档失败:%s", res.String())
}
log.Printf("文档插入成功")
}
func getDocument(client *elasticsearch.Client, indexName, docID string) (map[string]interface{}, error) {
ctx := context.Background()
getRequest := esapi.GetRequest{
Index: indexName,
DocumentID: docID,
}
res, err := getRequest.Do(ctx, client)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.IsError() {
return nil, err
}
var doc map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&doc); err != nil {
return nil, err
}
return doc, nil
}
func deleteIndex(client *elasticsearch.Client, indexName string) {
ctx := context.Background()
deleteIndexRequest := esapi.IndicesDeleteRequest{
Index: []string{indexName},
}
res, err := deleteIndexRequest.Do(ctx, client)
if err != nil {
log.Fatalf("无法删除索引:%v", err)
}
defer res.Body.Close()
if res.IsError() {
log.Fatalf("删除索引失败:%s", res.String())
}
log.Printf("索引 %s 删除成功", indexName)
}
使用 Bulk 进行批量操作
Bulk 是 Elasticsearch 提供的一种批量操作 API,它允许您在单个请求中执行多个操作,例如创建、更新和删除文档。Bulk API 可以显著提高索引性能,因为它可以减少网络开销和请求延迟。
使用 Bulk API,可以将多个操作打包到一个请求中,然后将其发送到 Elasticsearch。每个操作都由一个 JSON 对象表示,该对象包含操作类型、文档 ID 和文档数据。Bulk API 支持以下操作类型:
- index:创建新文档或替换现有文档。
- update:更新现有文档的部分内容。
- delete:删除现有文档。
批量创建操作
以下是一个使用 Bulk API 批量创建文档的示例:
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/elastic/go-elasticsearch/v8/esutil"
)
func bulkCreateDocuments(es *elasticsearch.Client, indexName string, documents []map[string]interface{}) error {
// 构建 Bulk API 请求
var reqs []esutil.BulkIndexerItem
for _, doc := range documents {
req := esutil.BulkIndexerItem{
Action: "index",
Index: indexName,
Body: esutil.NewJSONReader(doc),
}
reqs = append(reqs, req)
}
// 执行 Bulk API 请求
res, err := esutil.BulkIndex(context.Background(), es, esutil.BulkIndex.WithIndex(indexName), esutil.BulkIndex.WithBody(esutil.NewJSONReader(reqs...)))
if err != nil {
return err
}
defer res.Body.Close()
// 检查响应状态码
if res.IsError() {
var errorResponse map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&errorResponse); err != nil {
return fmt.Errorf("Bulk API 返回错误状态码: %s", res.Status())
} else {
return fmt.Errorf("Bulk API 返回错误状态码: %s, 错误信息: %s", res.Status(), errorResponse["error"].(map[string]interface{})["reason"])
}
}
// 解析响应
var response map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
return err
}
log.Printf("Bulk API 返回响应: %v", response)
return nil
}
这个 documents 参数是一个包含要创建的文档的 map 切片。
批量更新操作
documents 参数是一个包含要更新的文档 ID 和要更新的字段和值的 map。可以将其传递给 Bulk API 请求的 Body 字段。注意,由于这里使用了 doc 字段来更新文档,因此这里需要将要 更新的字段和值包装在一个名为 doc 的 map 中。
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/elastic/go-elasticsearch/v8/esutil"
)
func bulkUpdateDocuments(es *elasticsearch.Client, indexName string, documents map[string]map[string]interface{}) error {
// 构建 Bulk API 请求
var reqs []esutil.BulkIndexerItem
for id, doc := range documents {
req := esutil.BulkIndexerItem{
Action: "update",
Index: indexName,
Id: id,
Body: esutil.NewJSONReader(map[string]interface{}{"doc": doc}),
}
reqs = append(reqs, req)
}
// 执行 Bulk API 请求
res, err := esutil.BulkIndex(context.Background(), es, esutil.BulkIndex.WithIndex(indexName), esutil.BulkIndex.WithBody(esutil.NewJSONReader(reqs...)))
if err != nil {
return err
}
defer res.Body.Close()
// 检查响应状态码
if res.IsError() {
var errorResponse map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&errorResponse); err != nil {
return fmt.Errorf("Bulk API 返回错误状态码: %s", res.Status())
} else {
return fmt.Errorf("Bulk API 返回错误状态码: %s, 错误信息: %s", res.Status(), errorResponse["error"].(map[string]interface{})["reason"])
}
}
// 解析响应
var response map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&response); err != nil {
return err
}
log.Printf("Bulk API 返回响应: %v", response)
return nil
}